import java.io.File;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Author :  Rocky
 * Date : 24/05/2017 23:08
 * Description :
 * Test :
 */
public class MultiThreadMultiFiles_Produce_Consumer_Test {

    public static void main(String[] args) throws InterruptedException {
        long s = System.currentTimeMillis();

        for (int j = 0; j < 10; j++) {
            CountDownLatch latch = new CountDownLatch(10);
            for (int i = 0; i < 10; i++) {
                int finalI = i;
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            byte data = 4;
                            File file = new File("testfile" + finalI);
//                        file.deleteOnExit();
                            RandomAccessFile raf = new RandomAccessFile(file, "rw");
                            MappedByteBuffer mbb = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 200 * 1024 * 1024);

                            while (mbb.hasRemaining()) {
                                mbb.put(data);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        latch.countDown();
                    }
                });
                t.start();
            }
            latch.await();
        }

        long time = (System.currentTimeMillis() - s);
        long totalBytes = 20000; //单位m
        System.out.println("10个线程10个文件，每个线程负责写一个文件，各写200m，循环10次， 总耗时 " + time + " ms ，速度 " + (totalBytes * 1024 / (time * 1.0 / 1000)) + " kb/s");


    }


    //
    class WriteRequest {

        private byte data;

        private MappedByteBufferWrapper mbbWrapper;

        public WriteRequest(byte data, MappedByteBufferWrapper mbbWrapper) {
            this.data = data;
            this.mbbWrapper = mbbWrapper;
        }
    }


    //封装 MappedByteBuffer 和 正在使用它的线程
    class MappedByteBufferWrapper {

        private MappedByteBuffer mbb;

        private AtomicReference<Thread> mbbOwner = new AtomicReference(null);

        private int[] casRetryDelayLevel = new int[]{2, 3, 5, 5, 10, 10, 5, 15, 20, 50, 100, 200, 500, 1000, 2000};

        public MappedByteBufferWrapper(MappedByteBuffer mbb) {
            this.mbb = mbb;
        }

        private boolean tryBecameMbbOwner() {
            for (int i = 0; i < 3; i++) {
                //尝试获取 MappedByteBuffer 的 拥有权
                boolean getSuccess = false;
                int casRetryCount = 0;

                while (casRetryCount < casRetryDelayLevel.length) {
                    if (mbbOwner.compareAndSet(null, Thread.currentThread())) {
                        getSuccess = true;
                        break;
                    }

                    try {
                        Thread.sleep(casRetryDelayLevel[casRetryCount]);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    casRetryCount++;
                }

                //如果还没成功
                if (!getSuccess) {
                    //中断占有线程
                    mbbOwner.get().interrupt();
                }

                if (getSuccess) {
                    System.out.println("第 " + i + " 次获取 MappedByteBuffer 写权限成功，sleep level [index=" + casRetryCount + ",value=" + casRetryDelayLevel[casRetryCount] + "] ");
                    return true;
                }
            }

            System.out.println("获取 MappedByteBuffer 写权限失败");
            return false;
        }


        private void releaseWriteRigth() {
            mbbOwner.compareAndSet(Thread.currentThread(), null);
        }
    }


    class WriteRequestQueue {

        private LinkedBlockingQueue<WriteRequest> queue = new LinkedBlockingQueue<>();

        private int offerRetryCount = 10;
        private int pollRetryCount = 10;

        private int queueSize;

        public WriteRequestQueue(int queueSize) {
            this.queueSize = queueSize;
            this.queue = new LinkedBlockingQueue<>(this.queueSize);
        }

        private void put(WriteRequest request) {

            boolean offerSuccess = false;
            for (int i = 0; i < offerRetryCount && !offerSuccess; i++) {
                offerSuccess = queue.offer(request);
            }

            while (!offerSuccess) {
                try {
                    System.out.println("offer失败，尝试put");
                    queue.put(request);
                    offerSuccess = true;
                } catch (InterruptedException e) {
                    System.out.println("不严重异常 " + e.getMessage());
                }
            }
        }

        private WriteRequest take() {
            WriteRequest writeRequest = null;

            for (int i = 0; i < pollRetryCount && writeRequest == null; i++) {
                writeRequest = queue.poll();
            }

            //如果10后仍然没有获取数据，则 take
            while (writeRequest == null) {
                try {
                    System.out.println("poll失败，尝试put");
                    writeRequest = queue.take();
                } catch (InterruptedException e) {
                    System.out.println("不严重异常 " + e.getMessage());
                }
            }

            return writeRequest;
        }
    }


    //定义写进程要完成的工作
    class WriterRunnable implements Runnable {

        //poll重试次数
        private int pollRetryCount = 10;


        private WriteRequestQueue writeRequestQueue;

        public WriterRunnable(WriteRequestQueue writeRequestQueue) {
            this.writeRequestQueue = writeRequestQueue;
        }


        @Override
        public void run() {
            while (true) {
                try {
                    WriteRequest writeRequest = writeRequestQueue.take();

                    //此时 writeRequest != null
                    MappedByteBufferWrapper mbbWrapper = writeRequest.mbbWrapper;

                    if (mbbWrapper.tryBecameMbbOwner()) {
                        if (mbbWrapper.mbb.hasRemaining()) {
                            mbbWrapper.mbb.put(writeRequest.data);
                        }

                        System.out.println("");
                        mbbWrapper.releaseWriteRigth();
                    } else {
                        System.out.println("多次尝试获取写数据的权限都失败，丢弃该信息");
                    }
                } catch (Throwable e) {
                    System.out.println("写数据异常，将丢弃该信息 " + e.getMessage());
                }
            }
        }
    }
}
